<?php
/**
 * 延迟队列执行脚本，从队列中取出数据，判断分集是否全部注入EPG成功，如果成功，则将绑定关系/解绑定关系从延迟队列放入到透传队列中
 * @author chunyang.shu
 * @date 2018-01-29
 */
header('Content-type: text/html; charset=utf-8');
ini_set('display_errors', 1);
set_time_limit(0);
include_once dirname(dirname(dirname(__FILE__))) . '/mgtv_init.php';
$arr_dir = explode('|',str_replace(array('/','\\'), '|', dirname(__DIR__)));
$str_dir = array_pop($arr_dir);
define('ORG_ID', $str_dir);
include_once dirname(dirname(dirname(__FILE__))) . '/' . ORG_ID . '/init.php';
include_once dirname(dirname(dirname(dirname(dirname(__FILE__))))) . '/np/np_redis_check.class.php';
include_once dirname(dirname(dirname(dirname(__FILE__)))) . '/nn_logic/c2_task/c2_task.class.php';
include_once dirname(dirname(dirname(__FILE__))) . '/models/queue_task_model.php';
class delayed_queue_execute extends nn_timer
{
    /**
     * 单次脚本处理数据最大条数
     * @var int
     */
    private $int_execute_data_max_num = 1000;

    /**
     * 执行定时任务
     * @param $params 参数
     * @author chunyang.shu
     * @date 2018-01-29
     */
    public function action($params = null)
    {
        $this->msg('开始执行...');
        $this->do_timer_action();
        $this->msg('执行结束...');
    }

    /**
     * 执行定时任务方法
     * @author chunyang.shu
     * @date 2018-01-29
     */
    private function do_timer_action()
    {
        // 获取延迟队列中的数据
        $arr_redis_config = array(
            'queue_name'        => 'htbk',
            'command_lose_time' => 86400
        );
        $obj_redis = nl_get_redis();
        $obj_redis_check = new np_redis_check_class($obj_redis, $arr_redis_config);
        $arr_list = $obj_redis_check->get_command($this->int_execute_data_max_num);
        if (empty($arr_list))
        {
            $this->msg('延迟队列中没有数据');
            return true;
        }

        // 创建DC对象
        $obj_dc = nl_get_dc(array (
            'db_policy'     => NL_DB_WRITE,
            'cache_policy'  => NP_KV_CACHE_TYPE_MEMCACHE
        ));
        $obj_dc->open();

        // 创建queue_task对象
        $obj_queue_task_model = new queue_task_model();

        // 循环处理延迟队列中的数据
        $arr_back_list = array();
        foreach ($arr_list as $arr_command_item)
        {
            $this->msg('延迟队列数据为：' . $arr_command_item['command']);
            $arr_item = json_decode($arr_command_item['command'], true);
            if (!is_array($arr_item) || empty($arr_item))
            {
                $this->msg('延迟队列数据格式错误，不能解析为数组');
                return false;
            }
            $this->msg('延迟队列数据解析成数组为：' . var_export($arr_item, true));

            // 根据不同的操作类型做不同的处理
            $bool_pass_queue_enabled = true;
            if (!isset($arr_item['action']) || strlen($arr_item['action']) == 0)
            {
                $this->msg('延迟队列数据操作类型为空');
                continue;
            }
            if ($arr_item['action'] == 'asset_package')
            {
                // 查询主媒资的注入状态，如果主媒资注入EPG还没有成功，则还不能将数据放入到透传队列
                if (!isset($arr_item['vod_id']) || strlen($arr_item['vod_id']) == 0)
                {
                    $this->msg('主媒资绑定/解绑分集，主媒资信息为空');
                    continue;
                }
                $arr_params = array(
                    'nns_ref_id'    => $arr_item['vod_id'],
                    'nns_type'      => 'video',
                    'nns_org_id'    => 'hljdx',
                    'nns_epg_status'=> '99',
                    'in'            => array(
                        'nns_action'=> array('add', 'modify')
                    )
                );
                $str_order = 'order by nns_create_time desc';
                $arr_vod_c2_info = nl_c2_task::timer_get_c2_task($obj_dc, $arr_params, 1, '', $str_order);
                if ($arr_vod_c2_info['ret'] != 0)
                {
                    $this->msg('查询主媒资注入EPG状态，logic返回失败，返回数据为：' . var_export($arr_vod_c2_info, true));
                    return false;
                }
                if (!is_array($arr_vod_c2_info['data_info']) || empty($arr_vod_c2_info['data_info']))
                {
                    $this->msg('查询主媒资注入EPG状态，没有数据');
                    $bool_pass_queue_enabled = false;
                }
                else
                {
                    // 解析分集ID列表，数据格式为：分集id|分集号|分集类型|分集cp_id|分集状态,分集id|分集号|分集类型|分集cp_id|分集状态
                    if (!isset($arr_item['index']) || strlen($arr_item['index']) == 0)
                    {
                        $this->msg('主媒资绑定/解绑分集，分集信息为空');
                        continue;
                    }
                    $arr_index_ids = array();
                    $arr_index_info_list = explode(',', $arr_item['index']);
                    foreach ($arr_index_info_list as $str_index_info)
                    {
                        $arr_index_info = explode('|', $str_index_info);
                        if (isset($arr_index_info[0]) && strlen($arr_index_info[0]) > 0)
                        {
                            $arr_index_ids[] = $arr_index_info[0];
                        }
                    }
                    if (empty($arr_index_ids))
                    {
                        $this->msg('主媒资不绑定/解绑定分集，分集信息为空');
                        continue;
                    }

                    // 查询分集注入EPG的状态，如果所有分集都注入EPG成功，则将延迟队列数据放入透传队列
                    $arr_params = array(
                        'nns_type'      => 'index',
                        'nns_org_id'    => 'hljdx',
                        'nns_epg_status'=> '99',
                        'in'            => array(
                            'nns_ref_id'    => $arr_index_ids,
                            'nns_action'    => array('add', 'modify')
                        )
                    );
                    $str_group = 'group by nns_ref_id';
                    $arr_index_c2_list = nl_c2_task::timer_get_c2_task(
                        $obj_dc,
                        $arr_params,
                        $this->int_execute_data_max_num,
                        $str_group,
                        $str_order
                    );
                    if ($arr_index_c2_list['ret'] != 0)
                    {
                        $this->msg('查询分集注入EPG状态，logic返回失败，返回数据为：' . var_export($arr_index_c2_list, true));
                        return false;
                    }
                    if (!is_array($arr_index_c2_list['data_info']) || empty($arr_index_c2_list['data_info']))
                    {
                        $this->msg('查询分集注入EPG状态，没有数据');
                        $bool_pass_queue_enabled = false;
                    }
                    else
                    {
                        $arr_c2_index_ids = array_keys(np_array_rekey($arr_index_c2_list['data_info'], 'nns_ref_id'));
                        $arr_in_import_index_ids = array_diff($arr_index_ids, $arr_c2_index_ids);
                        if (!empty($arr_in_import_index_ids))
                        {
                            $this->msg('存在分集还没有成功注入EPG，分集ID为：' . var_export($arr_in_import_index_ids, true));
                            $bool_pass_queue_enabled = false;
                        }
                        else
                        {
                            if (!isset($arr_item['data']) || empty($arr_item['data']))
                            {
                                $this->msg('透传队列数据为空');
                                continue;
                            }
                            $bool_pass_queue_enabled = true;
                        }
                    }
                }
            }
            else
            {
                continue;
            }

            // 如果分集都已经注入EPG成功，将数据加入到透传队列，否则将数据重新放回延迟队列
            if ($bool_pass_queue_enabled)
            {
                $this->msg('将延迟队列中的数据放入到透传队列中，数据为：' . var_export($arr_item['data'], true));
                $arr_result = $obj_queue_task_model->q_add_pass_queue_op_mgtv(
                    $arr_item['data']['type'],
                    $arr_item['data']['action'],
                    $arr_item['data']['cp_id'],
                    $arr_item['data']['pass_content'],
                    $arr_item['message_id']
                );
                if ($arr_result['ret'] != 0)
                {
                    $this->msg('将延迟队列中的数据放入到透传队列logic返回失败，返回数据为：' . var_export($arr_result, true));
                    return false;
                }
            }
            else
            {
                $arr_item['execute_time'] = date('Y-m-d H:i:s');
                $arr_back_list[] = array(
                    'time'      => $arr_command_item['time'],
                    'command'   => json_encode($arr_item)
                );
            }
        }

        // 回收队列数据
        if (!empty($arr_back_list))
        {
            $obj_redis_check->recover_command($arr_back_list);
        }

        // 返回结果
        return true;
    }
}

$obj_delayed_queue_execute = new delayed_queue_execute('delayed_queue_execute', 'public', __FILE__);
$obj_delayed_queue_execute->run();